{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "79724cfa",
   "metadata": {},
   "source": [
    "# XGBoost-Ray with Dask\n",
    "\n",
    "\n",
    "This notebook includes an example workflow using\n",
    "[XGBoost-Ray](https://docs.ray.io/en/latest/xgboost-ray.html) and\n",
    "[Dask](https://docs.dask.org/en/latest/) for distributed model training,\n",
    "hyperparameter optimization, and prediction.\n",
    "\n",
    "## Cluster Setup\n",
    "\n",
    "First, we'll set up our Ray Cluster. The provided `dask_xgboost.yaml`\n",
    "cluster config can be used to set up an AWS cluster with 64 CPUs.\n",
    "\n",
    "The following steps assume you are in a directory with both\n",
    "``dask_xgboost.yaml`` and this file saved as ``dask_xgboost.ipynb``.\n",
    "\n",
    "**Step 1:** Bring up the Ray cluster.\n",
    "\n",
    "```bash\n",
    "pip install ray boto3\n",
    "ray up dask_xgboost.yaml\n",
    "```\n",
    "**Step 2:** Move ``dask_xgboost.ipynb`` to the cluster and start Jupyter.\n",
    "\n",
    "```bash\n",
    "ray rsync_up dask_xgboost.yaml \"./dask_xgboost.ipynb\" \\\n",
    "    \"~/dask_xgboost.ipynb\"\n",
    "ray exec dask_xgboost.yaml --port-forward=9999 \"jupyter notebook \\\n",
    "    --port=9999\"\n",
    "```\n",
    "\n",
    "You can then access this notebook at the URL that is output:\n",
    "``http://localhost:9999/?token=<token>``\n",
    "\n",
    "## Python Setup\n",
    "\n",
    "First, we'll import all the libraries we'll be using. This step also helps us\n",
    "verify that the environment is configured correctly. If any of the imports\n",
    "are missing, an exception will be raised."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "166268b8",
   "metadata": {},
   "outputs": [],
   "source": [
    "import argparse\n",
    "import time\n",
    "\n",
    "import dask\n",
    "import dask.dataframe as dd\n",
    "from xgboost_ray import RayDMatrix, RayParams, train, predict\n",
    "\n",
    "import ray\n",
    "from ray import tune\n",
    "from ray.util.dask import ray_dask_get"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "05d4da07",
   "metadata": {},
   "source": [
    "Next, let's parse some arguments. This will be used for executing the ``.py``\n",
    "file, but not for the ``.ipynb``. If you are using the interactive notebook,\n",
    "you can directly override the arguments manually."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "77186e19",
   "metadata": {},
   "outputs": [],
   "source": [
    "parser = argparse.ArgumentParser()\n",
    "parser.add_argument(\n",
    "    \"--address\", type=str, default=\"auto\", help=\"The address to use for Ray.\"\n",
    ")\n",
    "parser.add_argument(\n",
    "    \"--smoke-test\",\n",
    "    action=\"store_true\",\n",
    "    help=\"Read a smaller dataset for quick testing purposes.\",\n",
    ")\n",
    "parser.add_argument(\n",
    "    \"--num-actors\", type=int, default=4, help=\"Sets number of actors for training.\"\n",
    ")\n",
    "parser.add_argument(\n",
    "    \"--cpus-per-actor\",\n",
    "    type=int,\n",
    "    default=6,\n",
    "    help=\"The number of CPUs per actor for training.\",\n",
    ")\n",
    "parser.add_argument(\n",
    "    \"--num-actors-inference\",\n",
    "    type=int,\n",
    "    default=16,\n",
    "    help=\"Sets number of actors for inference.\",\n",
    ")\n",
    "parser.add_argument(\n",
    "    \"--cpus-per-actor-inference\",\n",
    "    type=int,\n",
    "    default=2,\n",
    "    help=\"The number of CPUs per actor for inference.\",\n",
    ")\n",
    "# Ignore -f from ipykernel_launcher\n",
    "args, _ = parser.parse_known_args()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1c07e34c",
   "metadata": {},
   "source": [
    "Override these arguments as needed:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9a35a5f1",
   "metadata": {},
   "outputs": [],
   "source": [
    "address = args.address\n",
    "smoke_test = args.smoke_test\n",
    "num_actors = args.num_actors\n",
    "cpus_per_actor = args.cpus_per_actor\n",
    "num_actors_inference = args.num_actors_inference\n",
    "cpus_per_actor_inference = args.cpus_per_actor_inference"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e4077845",
   "metadata": {},
   "source": [
    "## Connecting to the Ray cluster\n",
    "\n",
    "Now, let's connect our Python script to this newly deployed Ray cluster!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "57fc42c7",
   "metadata": {},
   "outputs": [],
   "source": [
    "if not ray.is_initialized():\n",
    "    ray.init(address=address)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2efcf435",
   "metadata": {},
   "source": [
    "## Data Preparation\n",
    "\n",
    "We will use the `HIGGS dataset from the UCI Machine Learning dataset\n",
    "repository <https://archive.ics.uci.edu/ml/datasets/HIGGS>`_. The HIGGS\n",
    "dataset consists of 11,000,000 samples and 28 attributes, which is large\n",
    "enough size to show the benefits of distributed computation.\n",
    "\n",
    "We set the Dask scheduler to ``ray_dask_get`` to use `Dask on Ray\n",
    "<https://docs.ray.io/en/latest/data/dask-on-ray.html>`_ backend."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e3e8e177",
   "metadata": {},
   "outputs": [],
   "source": [
    "LABEL_COLUMN = \"label\"\n",
    "if smoke_test:\n",
    "    # Test dataset with only 10,000 records.\n",
    "    FILE_URL = \"https://ray-ci-higgs.s3.us-west-2.amazonaws.com/simpleHIGGS\" \".csv\"\n",
    "else:\n",
    "    # Full dataset. This may take a couple of minutes to load.\n",
    "    FILE_URL = (\n",
    "        \"https://archive.ics.uci.edu/ml/machine-learning-databases\"\n",
    "        \"/00280/HIGGS.csv.gz\"\n",
    "    )\n",
    "colnames = [LABEL_COLUMN] + [\"feature-%02d\" % i for i in range(1, 29)]\n",
    "dask.config.set(scheduler=ray_dask_get)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b2bdf8e7",
   "metadata": {},
   "outputs": [],
   "source": [
    "load_data_start_time = time.time()\n",
    "\n",
    "data = dd.read_csv(FILE_URL, names=colnames)\n",
    "data = data[sorted(colnames)]\n",
    "data = data.persist()\n",
    "\n",
    "load_data_end_time = time.time()\n",
    "load_data_duration = load_data_end_time - load_data_start_time\n",
    "print(f\"Dataset loaded in {load_data_duration} seconds.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "be214015",
   "metadata": {},
   "source": [
    "With the connection established, we can now create the Dask dataframe.\n",
    "\n",
    "We will split the data into a training set and a evaluation set using a 80-20\n",
    "proportion."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f84b5aca",
   "metadata": {},
   "outputs": [],
   "source": [
    "train_df, eval_df = data.random_split([0.8, 0.2])\n",
    "train_df, eval_df = train_df.persist(), eval_df.persist()\n",
    "print(train_df, eval_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "657e7f56",
   "metadata": {},
   "source": [
    "## Distributed Training\n",
    "\n",
    "The ``train_xgboost`` function contains all of the logic necessary for\n",
    "training using XGBoost-Ray.\n",
    "\n",
    "Distributed training can not only speed up the process, but also allow you\n",
    "to use datasets that are to large to fit in memory of a single node. With\n",
    "distributed training, the dataset is sharded across different actors\n",
    "running on separate nodes. Those actors communicate with each other to\n",
    "create the final model.\n",
    "\n",
    "First, the dataframes are wrapped in ``RayDMatrix`` objects, which handle\n",
    "data sharding across the cluster. Then, the ``train`` function is called.\n",
    "The evaluation scores will be saved to ``evals_result`` dictionary. The\n",
    "function returns a tuple of the trained model (booster) and the evaluation\n",
    "scores.\n",
    "\n",
    "The ``ray_params`` variable expects a ``RayParams`` object that contains\n",
    "Ray-specific settings, such as the number of workers."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3b8957b3",
   "metadata": {},
   "outputs": [],
   "source": [
    "def train_xgboost(config, train_df, test_df, target_column, ray_params):\n",
    "    train_set = RayDMatrix(train_df, target_column)\n",
    "    test_set = RayDMatrix(test_df, target_column)\n",
    "\n",
    "    evals_result = {}\n",
    "\n",
    "    train_start_time = time.time()\n",
    "\n",
    "    # Train the classifier\n",
    "    bst = train(\n",
    "        params=config,\n",
    "        dtrain=train_set,\n",
    "        evals=[(test_set, \"eval\")],\n",
    "        evals_result=evals_result,\n",
    "        ray_params=ray_params,\n",
    "    )\n",
    "\n",
    "    train_end_time = time.time()\n",
    "    train_duration = train_end_time - train_start_time\n",
    "    print(f\"Total time taken: {train_duration} seconds.\")\n",
    "\n",
    "    model_path = \"model.xgb\"\n",
    "    bst.save_model(model_path)\n",
    "    print(\"Final validation error: {:.4f}\".format(evals_result[\"eval\"][\"error\"][-1]))\n",
    "\n",
    "    return bst, evals_result"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8edbeab5",
   "metadata": {},
   "source": [
    "We can now pass our Dask dataframes and run the function. We will use\n",
    "``RayParams`` to specify that the number of actors and CPUs to train with.\n",
    "\n",
    "The dataset has to be downloaded onto the cluster, which may take a few\n",
    "minutes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3b48ea01",
   "metadata": {},
   "outputs": [],
   "source": [
    "# standard XGBoost config for classification\n",
    "config = {\n",
    "    \"tree_method\": \"approx\",\n",
    "    \"objective\": \"binary:logistic\",\n",
    "    \"eval_metric\": [\"logloss\", \"error\"],\n",
    "}\n",
    "\n",
    "bst, evals_result = train_xgboost(\n",
    "    config,\n",
    "    train_df,\n",
    "    eval_df,\n",
    "    LABEL_COLUMN,\n",
    "    RayParams(cpus_per_actor=cpus_per_actor, num_actors=num_actors),\n",
    ")\n",
    "print(f\"Results: {evals_result}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0c04c4e3",
   "metadata": {},
   "source": [
    "## Hyperparameter optimization\n",
    "\n",
    "If we are not content with the results obtained with default XGBoost\n",
    "parameters, we can use [Ray Tune](https://docs.ray.io/en/latest/tune/index.html) for cutting-edge\n",
    "distributed hyperparameter tuning. XGBoost-Ray automatically integrates\n",
    "with Ray Tune, meaning we can use the same training function as before.\n",
    "\n",
    "In this workflow, we will tune three hyperparameters - ``eta``, ``subsample``\n",
    "and ``max_depth``. We are using [Tune's samplers to define the search\n",
    "space](https://docs.ray.io/en/latest/tune/user-guide.html#search-space-grid-random).\n",
    "\n",
    "The experiment configuration is done through ``tune.run``. We set the amount\n",
    "of resources each trial (hyperparameter combination) requires by using the\n",
    "``get_tune_resources`` method of ``RayParams``. The ``num_samples`` argument\n",
    "controls how many trials will be ran in total. In the end, the best\n",
    "combination of hyperparameters evaluated during the experiment will be\n",
    "returned.\n",
    "\n",
    "By default, Tune will use simple random search. However, Tune also\n",
    "provides various [search algorithms](https://docs.ray.io/en/latest/tune/api_docs/suggestion.html) and\n",
    "[schedulers](https://docs.ray.io/en/latest/tune/api_docs/schedulers.html)\n",
    "to further improve the optimization process."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bde33d2a",
   "metadata": {},
   "outputs": [],
   "source": [
    "def tune_xgboost(train_df, test_df, target_column):\n",
    "    # Set XGBoost config.\n",
    "    config = {\n",
    "        \"tree_method\": \"approx\",\n",
    "        \"objective\": \"binary:logistic\",\n",
    "        \"eval_metric\": [\"logloss\", \"error\"],\n",
    "        \"eta\": tune.loguniform(1e-4, 1e-1),\n",
    "        \"subsample\": tune.uniform(0.5, 1.0),\n",
    "        \"max_depth\": tune.randint(1, 9),\n",
    "    }\n",
    "\n",
    "    ray_params = RayParams(\n",
    "        max_actor_restarts=1, cpus_per_actor=cpus_per_actor, num_actors=num_actors\n",
    "    )\n",
    "\n",
    "    tune_start_time = time.time()\n",
    "\n",
    "    analysis = tune.run(\n",
    "        tune.with_parameters(\n",
    "            train_xgboost,\n",
    "            train_df=train_df,\n",
    "            test_df=test_df,\n",
    "            target_column=target_column,\n",
    "            ray_params=ray_params,\n",
    "        ),\n",
    "        # Use the `get_tune_resources` helper function to set the resources.\n",
    "        resources_per_trial=ray_params.get_tune_resources(),\n",
    "        config=config,\n",
    "        num_samples=10,\n",
    "        metric=\"eval-error\",\n",
    "        mode=\"min\",\n",
    "    )\n",
    "\n",
    "    tune_end_time = time.time()\n",
    "    tune_duration = tune_end_time - tune_start_time\n",
    "    print(f\"Total time taken: {tune_duration} seconds.\")\n",
    "\n",
    "    accuracy = 1.0 - analysis.best_result[\"eval-error\"]\n",
    "    print(f\"Best model parameters: {analysis.best_config}\")\n",
    "    print(f\"Best model total accuracy: {accuracy:.4f}\")\n",
    "\n",
    "    return analysis.best_config"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0f52fbc0",
   "metadata": {},
   "source": [
    "Hyperparameter optimization may take some time to complete."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8836cc07",
   "metadata": {},
   "outputs": [],
   "source": [
    "tune_xgboost(train_df, eval_df, LABEL_COLUMN)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "82a1e0c8",
   "metadata": {},
   "source": [
    "## Prediction\n",
    "\n",
    "With the model trained, we can now predict on unseen data. For the\n",
    "purposes of this example, we will use the same dataset for prediction as\n",
    "for training.\n",
    "\n",
    "Since prediction is naively parallelizable, distributing it over multiple\n",
    "actors can measurably reduce the amount of time needed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cb1f0689",
   "metadata": {},
   "outputs": [],
   "source": [
    "inference_df = RayDMatrix(data, ignore=[LABEL_COLUMN, \"partition\"])\n",
    "results = predict(\n",
    "    bst,\n",
    "    inference_df,\n",
    "    ray_params=RayParams(\n",
    "        cpus_per_actor=cpus_per_actor_inference, num_actors=num_actors_inference\n",
    "    ),\n",
    ")\n",
    "\n",
    "print(results)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}